-- |
-- Module      : Streamly.Internal.Data.SVar.Type
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.SVar.Type
    (
    -- * Parent child communication
      ThreadAbort (..)
    , ChildEvent (..)
    , RunInIO(..)
    , AheadHeapEntry (..)

    -- * SVar
    , Count (..)
    , Limit (..)
    , SVarStyle (..)
    , SVarStopStyle (..)
    , SVarStats (..)
    , WorkerInfo (..)
    , PushBufferPolicy(..)
    , LatencyRange (..)
    , YieldRateInfo (..)
    , SVar (..)

    -- * State threaded around the stream
    , Rate (..)
    , State (streamVar)

    -- ** Default State
    , magicMaxBuffer
    , defState

    -- ** Type cast
    , adaptState

    -- ** State accessors
    , getMaxThreads
    , setMaxThreads
    , getMaxBuffer
    , setMaxBuffer
    , getStreamRate
    , setStreamRate
    , getStreamLatency
    , setStreamLatency
    , getYieldLimit
    , setYieldLimit
    , getInspectMode
    , setInspectMode
    )
where

import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar (MVar)
import Control.Exception (SomeException(..), Exception)
#ifndef USE_UNLIFTIO
import Control.Monad.Trans.Control (MonadBaseControl(StM))
#endif
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
import Data.IORef (IORef)
import Data.Kind (Type)
import Data.Set (Set)

import Streamly.Internal.Data.Time.Units (AbsTime, NanoSecond64(..))

newtype Count = Count Int64
    deriving ( Eq
             , Read
             , Show
             , Enum
             , Bounded
             , Num
             , Real
             , Integral
             , Ord
             )

------------------------------------------------------------------------------
-- Parent child thread communication type
------------------------------------------------------------------------------

data ThreadAbort = ThreadAbort deriving Show

instance Exception ThreadAbort

-- | Events that a child thread may send to a parent thread.
data ChildEvent a =
      ChildYield a
    | ChildStop ThreadId (Maybe SomeException)

#ifdef USE_UNLIFTIO
newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO b }
#else
newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO (StM m b) }
#endif

-- | Sorting out-of-turn outputs in a heap for Ahead style streams
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
      AheadEntryNull
    | AheadEntryPure a
    | AheadEntryStream (RunInIO m, t m a)
#undef Type

------------------------------------------------------------------------------
-- SVar: the state for thread management
------------------------------------------------------------------------------

-- | Identify the type of the SVar. Two computations using the same style can
-- be scheduled on the same SVar.
data SVarStyle =
      AsyncVar             -- depth first concurrent
    | WAsyncVar            -- breadth first concurrent
    | ParallelVar          -- all parallel
    | AheadVar             -- Concurrent look ahead
    deriving (Eq, Show)

-- | An SVar or a Stream Var is a conduit to the output from multiple streams
-- running concurrently and asynchronously. An SVar can be thought of as an
-- asynchronous IO handle. We can write any number of streams to an SVar in a
-- non-blocking manner and then read them back at any time at any pace.  The
-- SVar would run the streams asynchronously and accumulate results. An SVar
-- may not really execute the stream completely and accumulate all the results.
-- However, it ensures that the reader can read the results at whatever paces
-- it wants to read. The SVar monitors and adapts to the consumer's pace.
--
-- An SVar is a mini scheduler, it has an associated workLoop that holds the
-- stream tasks to be picked and run by a pool of worker threads. It has an
-- associated output queue where the output stream elements are placed by the
-- worker threads. A outputDoorBell is used by the worker threads to intimate the
-- consumer thread about availability of new results in the output queue. More
-- workers are added to the SVar by 'fromStreamVar' on demand if the output
-- produced is not keeping pace with the consumer. On bounded SVars, workers
-- block on the output queue to provide throttling of the producer  when the
-- consumer is not pulling fast enough.  The number of workers may even get
-- reduced depending on the consuming pace.
--
-- New work is enqueued either at the time of creation of the SVar or as a
-- result of executing the parallel combinators i.e. '<|' and '<|>' when the
-- already enqueued computations get evaluated. See 'joinStreamVarAsync'.

-- We measure the individual worker latencies to estimate the number of workers
-- needed or the amount of time we have to sleep between dispatches to achieve
-- a particular rate when controlled pace mode it used.
data WorkerInfo = WorkerInfo
    { workerYieldMax   :: Count -- 0 means unlimited
    -- total number of yields by the worker till now
    , workerYieldCount    :: IORef Count
    -- yieldCount at start, timestamp
    , workerLatencyStart  :: IORef (Count, AbsTime)
    }

data LatencyRange = LatencyRange
    { minLatency :: NanoSecond64
    , maxLatency :: NanoSecond64
    } deriving Show

-- Rate control.
data YieldRateInfo = YieldRateInfo
    { svarLatencyTarget    :: NanoSecond64
    , svarLatencyRange     :: LatencyRange
    , svarRateBuffer       :: Int

    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , svarGainedLostYields :: IORef Count

    -- Actual latency/througput as seen from the consumer side, we count the
    -- yields and the time it took to generates those yields. This is used to
    -- increase or decrease the number of workers needed to achieve the desired
    -- rate. The idle time of workers is adjusted in this, so that we only
    -- account for the rate when the consumer actually demands data.
    -- XXX interval latency is enough, we can move this under diagnostics build
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , svarAllTimeLatency :: IORef (Count, AbsTime)

    -- XXX Worker latency specified by the user to be used before the first
    -- actual measurement arrives. Not yet implemented
    , workerBootstrapLatency :: Maybe NanoSecond64

    -- After how many yields the worker should update the latency information.
    -- If the latency is high, this count is kept lower and vice-versa.  XXX If
    -- the latency suddenly becomes too high this count may remain too high for
    -- long time, in such cases the consumer can change it.
    -- 0 means no latency computation
    -- XXX this is derivable from workerMeasuredLatency, can be removed.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , workerPollingInterval :: IORef Count

    -- This is in progress latency stats maintained by the workers which we
    -- empty into workerCollectedLatency stats at certain intervals - whenever
    -- we process the stream elements yielded in this period. The first count
    -- is all yields, the second count is only those yields for which the
    -- latency was measured to be non-zero (note that if the timer resolution
    -- is low the measured latency may be zero e.g. on JS platform).
    -- [LOCKING] Locked access. Modified by the consumer thread as well as
    -- worker threads. Workers modify it periodically based on
    -- workerPollingInterval and not on every yield to reduce the locking
    -- overhead.
    -- (allYieldCount, yieldCount, timeTaken)
    , workerPendingLatency   :: IORef (Count, Count, NanoSecond64)

    -- This is the second level stat which is an accmulation from
    -- workerPendingLatency stats. We keep accumulating latencies in this
    -- bucket until we have stats for a sufficient period and then we reset it
    -- to start collecting for the next period and retain the computed average
    -- latency for the last period in workerMeasuredLatency.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    -- (allYieldCount, yieldCount, timeTaken)
    , workerCollectedLatency :: IORef (Count, Count, NanoSecond64)

    -- Latency as measured by workers, aggregated for the last period.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , workerMeasuredLatency :: IORef NanoSecond64
    }

data SVarStats = SVarStats {
      totalDispatches  :: IORef Int
    , maxWorkers       :: IORef Int
    , maxOutQSize      :: IORef Int
    , maxHeapSize      :: IORef Int
    , maxWorkQSize     :: IORef Int
    , avgWorkerLatency :: IORef (Count, NanoSecond64)
    , minWorkerLatency :: IORef NanoSecond64
    , maxWorkerLatency :: IORef NanoSecond64
    , svarStopTime     :: IORef (Maybe AbsTime)
}

-- This is essentially a 'Maybe Word' type
data Limit = Unlimited | Limited Word deriving Show

instance Eq Limit where
    Unlimited == Unlimited = True
    Unlimited == Limited _ = False
    Limited _ == Unlimited = False
    Limited x == Limited y = x == y

instance Ord Limit where
    Unlimited <= Unlimited = True
    Unlimited <= Limited _ = False
    Limited _ <= Unlimited = True
    Limited x <= Limited y = x <= y

-- When to stop the composed stream.
data SVarStopStyle =
      StopNone -- stops only when all streams are finished
    | StopAny  -- stop when any stream finishes
    | StopBy   -- stop when a specific stream finishes
    deriving (Eq, Show)

-- | Buffering policy for persistent push workers (in ParallelT).  In a pull
-- style SVar (in AsyncT, AheadT etc.), the consumer side dispatches workers on
-- demand, workers terminate if the buffer is full or if the consumer is not
-- cosuming fast enough.  In a push style SVar, a worker is dispatched only
-- once, workers are persistent and keep pushing work to the consumer via a
-- bounded buffer. If the buffer becomes full the worker either blocks, or it
-- can drop an item from the buffer to make space.
--
-- Pull style SVars are useful in lazy stream evaluation whereas push style
-- SVars are useful in strict left Folds.
--
-- XXX Maybe we can separate the implementation in two different types instead
-- of using a common SVar type.
--
data PushBufferPolicy =
      PushBufferDropNew  -- drop the latest element and continue
    | PushBufferDropOld  -- drop the oldest element and continue
    | PushBufferBlock    -- block the thread until space
                         -- becomes available

-- IMPORTANT NOTE: we cannot update the SVar after generating it as we have
-- references to the original SVar stored in several functions which will keep
-- pointing to the original data and the new updates won't reflect there.
-- Any updateable parts must be kept in mutable references (IORef).
--
data SVar t m a = SVar
    {
    -- Read only state
      svarStyle       :: SVarStyle
    , svarMrun        :: RunInIO m
    , svarStopStyle   :: SVarStopStyle
    , svarStopBy      :: IORef ThreadId

    -- Shared output queue (events, length)
    -- XXX For better efficiency we can try a preallocated array type (perhaps
    -- something like a vector) that allows an O(1) append. That way we will
    -- avoid constructing and reversing the list. Possibly we can also avoid
    -- the GC copying overhead. When the size increases we should be able to
    -- allocate the array in chunks.
    --
    -- [LOCKING] Frequent locked access. This is updated by workers on each
    -- yield and once in a while read by the consumer thread. This could have
    -- big locking overhead if the number of workers is high.
    --
    -- XXX We can use a per-CPU data structure to reduce the locking overhead.
    -- However, a per-cpu structure cannot guarantee the exact sequence in
    -- which the elements were added, though that may not be important.
    , outputQueue    :: IORef ([ChildEvent a], Int)

    -- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty
    -- to non-empty, or a work item is queued by a worker to the work queue and
    -- needDoorBell is set by the consumer.
    , outputDoorBell :: MVar ()  -- signal the consumer about output
    , readOutputQ    :: m [ChildEvent a]
    , postProcess    :: m Bool

    -- channel to send events from the consumer to the worker. Used to send
    -- exceptions from a fold driver to the fold computation running as a
    -- consumer thread in the concurrent fold cases. Currently only one event
    -- is sent by the fold so we do not really need a queue for it.
    , outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
    , outputDoorBellFromConsumer :: MVar ()

    -- Combined/aggregate parameters
    -- This is truncated to maxBufferLimit if set to more than that. Otherwise
    -- potentially each worker may yield one value to the buffer in the worst
    -- case exceeding the requested buffer size.
    , maxWorkerLimit :: Limit
    , maxBufferLimit :: Limit
    -- These two are valid and used only when maxBufferLimit is Limited.
    , pushBufferSpace  :: IORef Count
    , pushBufferPolicy :: PushBufferPolicy
    -- [LOCKING] The consumer puts this MVar after emptying the buffer, workers
    -- block on it when the buffer becomes full. No overhead unless the buffer
    -- becomes full.
    , pushBufferMVar :: MVar ()

    -- [LOCKING] Read only access by consumer when dispatching a worker.
    -- Decremented by workers when picking work and undo decrement if the
    -- worker does not yield a value.
    , remainingWork  :: Maybe (IORef Count)
    , yieldRateInfo  :: Maybe YieldRateInfo

    -- Used only by bounded SVar types
    , enqueue        :: (RunInIO m, t m a) -> IO ()
    , isWorkDone     :: IO Bool
    , isQueueDone    :: IO Bool
    , needDoorBell   :: IORef Bool
    , workLoop       :: Maybe WorkerInfo -> m ()

    -- Shared, thread tracking
    -- [LOCKING] Updated unlocked only by consumer thread in case of
    -- Async/Ahead style SVars. Updated locked by worker threads in case of
    -- Parallel style.
    , workerThreads  :: IORef (Set ThreadId)
    -- [LOCKING] Updated locked by consumer thread when dispatching a worker
    -- and by the worker threads when the thread stops. This is read unsafely
    -- at several places where we want to rely on an approximate value.
    , workerCount    :: IORef Int
    , accountThread  :: ThreadId -> m ()
    , workerStopMVar :: MVar ()

    , svarStats      :: SVarStats
    -- to track garbage collection of SVar
    , svarRef        :: Maybe (IORef ())

    -- Only for diagnostics
    , svarInspectMode :: Bool
    , svarCreator    :: ThreadId
    , outputHeap     :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
                              , Maybe Int)
    -- Shared work queue (stream, seqNo)
    , aheadWorkQueue :: IORef ([t m a], Int)
    }

-------------------------------------------------------------------------------
-- Overall state threaded around a stream
-------------------------------------------------------------------------------

-- | Specifies the stream yield rate in yields per second (@Hertz@).
-- We keep accumulating yield credits at 'rateGoal'. At any point of time we
-- allow only as many yields as we have accumulated as per 'rateGoal' since the
-- start of time. If the consumer or the producer is slower or faster, the
-- actual rate may fall behind or exceed 'rateGoal'.  We try to recover the gap
-- between the two by increasing or decreasing the pull rate from the producer.
-- However, if the gap becomes more than 'rateBuffer' we try to recover only as
-- much as 'rateBuffer'.
--
-- 'rateLow' puts a bound on how low the instantaneous rate can go when
-- recovering the rate gap.  In other words, it determines the maximum yield
-- latency.  Similarly, 'rateHigh' puts a bound on how high the instantaneous
-- rate can go when recovering the rate gap.  In other words, it determines the
-- minimum yield latency. We reduce the latency by increasing concurrency,
-- therefore we can say that it puts an upper bound on concurrency.
--
-- If the 'rateGoal' is 0 or negative the stream never yields a value.
-- If the 'rateBuffer' is 0 or negative we do not attempt to recover.
--
-- /Since: 0.5.0 ("Streamly")/
--
-- @since 0.8.0
data Rate = Rate
    { rateLow    :: Double -- ^ The lower rate limit
    , rateGoal   :: Double -- ^ The target rate we want to achieve
    , rateHigh   :: Double -- ^ The upper rate limit
    , rateBuffer :: Int    -- ^ Maximum slack from the goal
    }

-- XXX we can put the resettable fields in a oneShotConfig field and others in
-- a persistentConfig field. That way reset would be fast and scalable
-- irrespective of the number of fields.
--
-- XXX make all these Limited types and use phantom types to distinguish them
data State t m a = State
    { -- one shot configuration, automatically reset for each API call
      streamVar   :: Maybe (SVar t m a)
    , _yieldLimit  :: Maybe Count

    -- persistent configuration, state that remains valid until changed by
    -- an explicit setting via a combinator.
    , _threadsHigh    :: Limit
    , _bufferHigh     :: Limit
    -- XXX these two can be collapsed into a single type
    , _streamLatency  :: Maybe NanoSecond64 -- bootstrap latency
    , _maxStreamRate  :: Maybe Rate
    , _inspectMode    :: Bool
    }

-------------------------------------------------------------------------------
-- State defaults and reset
-------------------------------------------------------------------------------

-- A magical value for the buffer size arrived at by running the smallest
-- possible task and measuring the optimal value of the buffer for that.  This
-- is obviously dependent on hardware, this figure is based on a 2.2GHz intel
-- core-i7 processor.
magicMaxBuffer :: Word
magicMaxBuffer = 1500

defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads = Limited magicMaxBuffer
defaultMaxBuffer = Limited magicMaxBuffer

-- The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs.
defState :: State t m a
defState = State
    { streamVar = Nothing
    , _yieldLimit = Nothing
    , _threadsHigh = defaultMaxThreads
    , _bufferHigh = defaultMaxBuffer
    , _maxStreamRate = Nothing
    , _streamLatency = Nothing
    , _inspectMode = False
    }

-- XXX if perf gets affected we can have all the Nothing params in a single
-- structure so that we reset is fast. We can also use rewrite rules such that
-- reset occurs only in concurrent streams to reduce the impact on serial
-- streams.
-- We can optimize this so that we clear it only if it is a Just value, it
-- results in slightly better perf for zip/zipM but the performance of scan
-- worsens a lot, it does not fuse.
--
-- XXX This has a side effect of clearing the SVar and yieldLimit, therefore it
-- should not be used to convert from the same type to the same type, unless
-- you want to clear the SVar. For clearing the SVar you should be using the
-- appropriate unStream functions instead.
--
-- | Adapt the stream state from one type to another.
adaptState :: State t m a -> State t n b
adaptState st = st
    { streamVar = Nothing
    , _yieldLimit = Nothing
    }

-------------------------------------------------------------------------------
-- Smart get/set routines for State
-------------------------------------------------------------------------------

-- Use get/set routines instead of directly accessing the State fields
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit lim st =
    st { _yieldLimit =
            case lim of
                Nothing -> Nothing
                Just n  ->
                    if n <= 0
                    then Just 0
                    else Just (fromIntegral n)
       }

getYieldLimit :: State t m a -> Maybe Count
getYieldLimit = _yieldLimit

setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads n st =
    st { _threadsHigh =
            if n < 0
            then Unlimited
            else if n == 0
                 then defaultMaxThreads
                 else Limited (fromIntegral n)
       }

getMaxThreads :: State t m a -> Limit
getMaxThreads = _threadsHigh

setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer n st =
    st { _bufferHigh =
            if n < 0
            then Unlimited
            else if n == 0
                 then defaultMaxBuffer
                 else Limited (fromIntegral n)
       }

getMaxBuffer :: State t m a -> Limit
getMaxBuffer = _bufferHigh

setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate r st = st { _maxStreamRate = r }

getStreamRate :: State t m a -> Maybe Rate
getStreamRate = _maxStreamRate

setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency n st =
    st { _streamLatency =
            if n <= 0
            then Nothing
            else Just (fromIntegral n)
       }

getStreamLatency :: State t m a -> Maybe NanoSecond64
getStreamLatency = _streamLatency

setInspectMode :: State t m a -> State t m a
setInspectMode st = st { _inspectMode = True }

getInspectMode :: State t m a -> Bool
getInspectMode = _inspectMode
